《Java并发编程实战》第七章 取消与关闭 读书笔记

取消操作的原因:

  • 用户请求取消
  • 有时间限制的操作
  • 应用程序事件
  • 错误
  • 关闭

任务中断的三种方式:

使用请求关闭标记(例如boolean开关)

成功的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
public class PrimeGenerator implements Runnable {
private volatile boolean cancelled;
private final List<BigInteger> primes = new ArrayList<BigInteger>();
@Override
public void run() {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
synchronized (this) {
primes.add(p);
}
}
}
public void cancel() {
cancelled = true;
}
public synchronized List<BigInteger> get() {
return new ArrayList<BigInteger>(primes);
}
}

Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.junit.Test;
public class PrimeGeneratorTest {
@Test
public void test() {
PrimeGenerator primeGenerator = new PrimeGenerator();
new Thread(primeGenerator).start();
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
primeGenerator.cancel();
}
System.out.println(primeGenerator.get());
}
}

失败的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.math.BigInteger;
import java.util.concurrent.BlockingQueue;
public class BrokenPrimeGenerator implements Runnable {
private final BlockingQueue<BigInteger> queue;
private volatile boolean cancelled = false;
public BrokenPrimeGenerator(BlockingQueue<BigInteger> queue) {
this.queue = queue;
}
public void run() {
try {
BigInteger p = BigInteger.ONE;
while (!cancelled) {
p = p.nextProbablePrime();
queue.put(p);
}
} catch (InterruptedException exception) {
}
}
public void cancel() {
this.cancelled = true;
}
}

Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import java.math.BigInteger;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BrokenPrimeGeneratorTest {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<BigInteger> primes = new ArrayBlockingQueue<BigInteger>(
10);
BrokenPrimeGenerator brokenPrimeGenerator = new BrokenPrimeGenerator(
primes);
new Thread(brokenPrimeGenerator).start();
Thread.sleep(1000);
try {
while (needMorePrimes()) {
System.out.println(primes.take());
}
} finally {
brokenPrimeGenerator.cancel();
}
}
public static boolean needMorePrimes() {
// Dummy Method called by client
return false;
}
}

使用中断机制

与中断相关的

1
2
3
4
5
public class Thread{
public void interrupt(){...} // 1) set interrupted to true 2) Call native method to stop current thread.
public boolean isInterrupted() {...}
public static boolean interrupted() {...} // Clears the interrupted status of the thread and returns its previous value
}

InterruptedException异常:
针对上面的BlockingQueue的put方法是阻塞的,但是线程会检检测到中断的发生,并抛出InterruptedException,来提早结束任务。避免了阻塞导致线程一直等待的情况发生。

实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class BlockingInterruptTest {
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().isInterrupted()); // "false" will be printed
e.printStackTrace();
}
}
});
t.start();
Thread.sleep(10L);
t.interrupt();
final BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);
queue.put("A");
t = new Thread(new Runnable() {
@Override
public void run() {
try {
queue.put("B");
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().isInterrupted()); //"false" will be printed
e.printStackTrace();
}
}
});
t.start();
Thread.sleep(10L);
t.interrupt();
}
}

通过Future来实现取消

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
import java.util.concurrent.*;
/**
* Created by huazi on 17/7/4.
*/
public class Test implements Callable<String> {
@Override
public String call() throws Exception {
while (true) {
System.out.println("Task: Test\n");
Thread.sleep(100);
}
}
}
class Main {
public static void main(String[] args) {
//1.创建task类
//2.实现callable接口
//3.使用Executors类的newCachedThreadPool()方法创建ThreadPoolExecutor对象
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();
//4.创建Task对象
Test task = new Test();
//5.使用submit()方法提交任务给执行者
System.out.println("Main Executing the Task\n");
Future<String> result = executor.submit(task);
//6.使主任务睡眠2秒
try {
//使用timeUnit类将参数单位设置为秒
TimeUnit.SECONDS.sleep(2);
} catch (Exception e) {
e.printStackTrace();
}
//7.使用通过submit()方法返回的Future对象result的cancel()方法,
//取消任务的执行。传入true值作为cancel方法的参数。
System.out.println("Main:Canceling the Task\n");
result.cancel(true);
//8.将isCancelled()方法和isDone()的调用结果写入控制台,验证任务已取消,已完成
System.out.printf("Main:Canceled:%s\n",result.isCancelled());
System.out.printf("Main:Done:%s\n",result.isDone());
//9.使用shutdown()方法结束执行者,写入信息(到控制台),表明程序结束
executor.shutdown();
System.out.println("Main:The executor has finished");
}
}

响应中断

阻塞库方法,例如Thread.sleep和Object.wait等,都会检杏线程何时中断,并且在发现中断时提前返回。它们在响应中断时执行的操作包括:
1.清除中断状态
2.抛出InterruptedException
表示阻塞操作由于中断而提前结束。JVM并不能保证阻塞方法检测到中断的速度,但在实际情况中响应速度还是非常快的。

响应中断的方法

1.传递异常(throws InterruptedException)

2.恢复中断状态,从而事调用栈的上层代码能够对其进行处理(Thread.currentThread().interrupt();)
例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Test implements Runnable {
private BlockingQueue<String> queue;
public Test(BlockingQueue<String> queue) {
this.queue = queue;
}
public void run() {
try {
while (true) {
String task = queue.take();
System.out.println(task);
}
} catch (InterruptedException e) {
// Restore the interrupted status
// 当抛出中断异常后,如果此时不能向上抛出,则需要恢复中断状态,否则中断状态会丢失
Thread.currentThread().interrupt();
}
}
public static void main(String[] args) {
BlockingQueue<String> queue = new ArrayBlockingQueue<String>(1);
Test test = new Test(queue);
Thread t = new Thread(test);
t.start();
t.interrupt();
}
}

处理不可中断的阻塞

Java.io包中的同步Socket I/O。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层的套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException。

Java.io包中的同步I/O。当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptedException)并关闭链路(这还会使得其他在这条链路上阻塞的线程同样抛出ClosedByInterruptException)。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel。

Selector的异步I/O。如果一个线程在调用Selector.select方法(在java.nio.channels中)时阻塞了,那么调用close或wakeup方法会使线程抛出ClosedSelectorException并提前返回。

获取某个锁。如果一个线程由于等待某个内置锁而被阻塞,那么将无法响应中断,因为线程认为它肯定获得锁,所以将不会理会中断请求。但是,在Lock类中提供了lockInterruptibly方法,该方法允许在等待一个锁的同时仍能响应中断。

采用newTaskFor来封装非标准的取消

停止基于线程的服务

日志服务示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class TrackingExecutor extends AbstractExecutorService {
private final ExecutorService exec;
private final Set<Runnable> tasksCancelledAtShutdown =
Collections.synchronizedSet(new HashSet<Runnable>());
public TrackingExecutor(ExecutorService exec) {
this.exec = exec;
}
public List<Runnable> getCancelledTasks() {//返回被取消的任务
if (!exec.isTerminated())//如果shutdownNow未调用或调用未完成时
throw new IllegalStateException(/*...*/);
return new ArrayList<Runnable>(tasksCancelledAtShutdown);
}
public void execute(final Runnable runnable) {
exec.execute(new Runnable() {
public void run() {
try {
runnable.run();
/*参考:http://blog.csdn.net/coslay/article/details/48038795
* 实质上在这里会有线程安全性问题,存在着竞争条件,比如程序刚
* 好运行到这里,即任务任务(run方法)刚好运行完,这时外界调用
* 了shutdownNow(),这时下面finally块中的判断会有出错,明显示
* 任务已执行完成,但判断给出的是被取消了。如果要想安全,就不
* 应该让shutdownNow在run方法运行完成与下面判断前调用。我们要
* 将runnable.run()与下面的if放在一个同步块、而且还要将
* shutdownNow的调用也放同步块里并且与前面要是同一个监视器锁,
* 这样好像就可以解决了,不知道对不能。书上也没有说能不能解决,
* 只是说有这个问题!但反过来想,如果真的这样同步了,那又会带
* 性能上的问题,因为什么所有的任务都会串形执行,这样还要
* ExecutorService线程池干嘛呢?我想这就是后面作者为什么所说
* 这是“不可避免的竞争条件”
*/
} finally {
//如果调用了shutdownNow且运行的任务被中断
if (isShutdown()
&& Thread.currentThread().isInterrupted())
tasksCancelledAtShutdown.add(runnable);//记录被取消的任务
}
}
});
}
// 将ExecutorService 中的其他方法委托到exec
}

关闭ExecutorService

  shutdown():启动一次顺序关闭,执行完以前提交的任务,没有执行完的任务继续执行完。

  shutdownNow():试图停止所有正在执行的任务(向它们发出interrupt操作语法,无法保证能够停止正在处理的任务线程,但是会尽力尝试),并暂停处理正在等待的任务,并返回等待执行的任务列表。

  ExecutorService已关闭,再向它提交任务时会抛RejectedExecutionException异常
  
  只要调用了这两个关闭方法中的任意一个,isShutdown方法就会返回true。当所有的任务都已关闭后,才表示线程池关闭成功,这时调用isTerminaed方法会返回true。   

  至于应该调用哪一种方法来关闭线程池,应该由提交到线程池的任务特性决定,通常调用shutdown方法来关闭线程池,如果任务不一定要执行完,则可以调用shutdownNow方法。

“毒丸”对象——当得到这个对象时,立即停止

  在提交“毒丸”对象之前提交的所有工作都会被处理,而生产者在提交了“毒丸”对象后,将不会再提交任何工作  

只执行一次的服务

  如果某个方法需要处理一批任务,并且当所有任务都处理完成后才返回,那么可以通过一次私有的Executor来简化服务的生命周期管理,其中该Executor的生命周期是由这个方法来控制的。  

处理非正常的线程终止

在一个线程中启动另一个线程,另一个线程中抛出异常,如果没有捕获它,这个异常也不会传递到父线程中

  任何代码都可能抛出一个RuntimeException。每当调用另一个方法时,都要对它的行为保持怀疑,不要盲目地认为它一定会正常返回,或者一定会抛出在方法原型中声明的某个已检查异常
  

1
2
3
4
5
6
7
8
9
10
11
12
  //如果任务抛出了一个运行时异常,它将允许线程终结,但是会首先通知框架:线程已经终结
public void run() {//工作者线程的实现
Throwable thrown = null;
try {
while (!isInterrupted())
runTask(getTaskFromWorkQueue());
} catch (Throwable e) {//为了安全,捕获的所有异常
thrown = e;//保留异常信息
} finally {
threadExited(this, thrown);// 重新将异常抛给框架后终结工作线程
}
}

未捕获异常的处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(new UncaughtException.Run());
t.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("uncaughtExceptionHandler catch a Exception---------");
System.out.println(e.getMessage());
}
});
t.start();
Thread.sleep(100);
}
static class Run implements Runnable{
@Override
public void run() {
System.out.println("runnable run---------------");
int i = 1/0;
}
}

jvm关闭

JVM的关闭意味着将停止系统中所有的任务,可以由其自动关闭也可以主动触发。

关闭钩子

如果我们想在JVM关闭时做一些事情该怎么办?JVM提供给开发者提供了关闭钩子,使其在JVM关闭时可以利用一个线程来做一些收尾工作例如(删除临时文件)。注册关闭钩子的方法是调用Runtime.addShutdownHook

守护线程 一个线程来执行一些辅助工作,但有不希望这个线程阻碍JVM的关闭

守护线程用于执行一些辅助任务,如垃圾回收,JVM关闭时不论守护线程运行到哪里都可能马上停止。

终结器(清理文件句柄或套接字句柄等)——避免使用

垃圾回收器对那些定义了finalize方法的对象会进行特殊处理:在回收器释放它们后,调用它们的finalize方法,从而确保一些持久化的资源被释放。

  通过使用finally代码块和显式的close方法,能够比使用终结器更好地管理资源

例外:当需要管理对象时,并且该对象持有的资源是通过本地方法获得的

参考

第七章:取消与关闭——Java并发编程实战

《Java并发编程实战》第七章 取消与关闭 读书笔记

JVM关闭及线程的退出

Java线程未捕获异常处理

《Java并发编程的艺术》

          1. 《Java并发编程实战》
欢迎大家关注:huazi's微信公众号